#!/usr/pkg/bin/python """Open Street Map file parser and editor understands the format of osm gpx files and allows command-line editing actually only made to work with data imported from TIGER using the tiger-to-osm.sh script partial glossary: CFCC: Census Feature Class Code TLID: Tiger Line ID """ Copyright = """ osm - OpenStreetMap.org file parser and editor Copyright (C) 2007 John Comeau This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. """ errormessage = "Not all needed libraries found, upgrade or check path: " try: True # not defined in older Python releases except: True, False = 1, 0 try: import sys, os, types, re, pwd sys.path.append(os.path.join(pwd.getpwuid(os.geteuid())[5], 'lib', 'python')) errormessage = errormessage + repr(sys.path) from com.jcomeau import gpl, jclicense except: try: sys.stderr.write("%s\n" % errormessage) except: print errormessage raise # get name this program was called as myself = os.path.split(sys.argv[0])[1] command = os.path.splitext(myself)[0] # chop any suffix (extension) # now get name we gave it when we wrote it originalself = re.compile('[0-9A-Za-z]+').search(Copyright).group() # globals and routines that should be in every program # (yes, you could import them, but there are problems in that approach too) def DebugPrint(*whatever): return False # defined instead by pytest module, use that for debugging def join(*args): "for pythons without str.join" string, array = args if type(array) == types.StringType: array = eval(array) if hasattr(str, 'join'): return string.join(array) else: joined = '' for index in range(0, len(array)): joined = joined + array[index] if index != (len(array) - 1): joined = joined + string return joined def split(*args): "for pythons without str.split" string, string_to_split = args if not len(string): string = None if hasattr('str', 'split'): return string_to_split.split(string) else: return re.compile(re.escape(string)).split(string_to_split) # other globals, specific to this program from signal import * from time import * import httplib, base64 osmdata = { 'stack': [], 'dict': {}, 'bytes': 0, 'server': 'www.openstreetmap.org', } element_pattern = re.compile("]*(?:/|>)") property_pattern = re.compile("""\s+([a-z]+)=(['"])(.*?)(\\2)""") endpattern = re.compile('.*?(?:/|\s*$') formatsingle = '\t<%s%s/>' formatmultiple = '<%s%s>' formatend = '' multiples = ['node', 'segment', 'way', 'osm'] def dump(*args): sys.stderr.write('%s\n' % repr(osmdata)) def osminit(*args): sys.setrecursionlimit(10000) signal(SIGUSR1, dump) if len(args) < 1: osmdata['source'] = sys.stdin else: file = open(args[0]) osmdata['source'] = file def stackadjust(): #DebugPrint('ending element %s' % osmdata['stack'][-1][0]) try: osmdata['stack'][-2].append(osmdata['stack'][-1]) osmdata['stack'].pop(-1) except: if len(osmdata['stack']) != 1 or osmdata['stack'][0][0] != 'osm': sys.stderr.write('bad stack: %s\n' % repr(osmdata['stack'])) def write(*args): """write out an OSM GPX file takes as arguments the tags osm|way|segment|node, or IDs of nodes, segments, or ways""" if not len(osmdata['stack']): osminit(args[0]) args = args[1:] load() sys.stdout.write('%s\n' % "") stackpicture = repr(map(lambda a: a[0], osmdata['stack'][0][1:])) DebugPrint('stack: %s' % stackpicture) dumpstack(list(osmdata['stack'])) # list() makes a copy def newnode(nodefile): """nodefile must be in the correct OSM 0.4 XML format alternatively, the data itself can be in the commandline""" return httprequest('PUT', '/api/0.4/node/create', nodefile) def deletenode(nodenumber): """find out the nodenumber by downloading data from OSM make sure no segments depend on that node!""" return httprequest('DELETE', '/api/0.4/node/%s' % nodenumber, None) def httprequest(*args): DebugPrint('args', args) while len(args) < 3: args += (None,) try: content = open(args[2]).read() except: content = args[2] # not the name of a file, just use as is DebugPrint("sending data '%s'" % content) connection = httplib.HTTPConnection(osmdata['server']) connection.request(args[0], args[1], content, {'authorization': authinfo(osmdata['server'])}) response = connection.getresponse() responseheaders = dict(response.getheaders()) DebugPrint(response.status, response.reason, responseheaders) if False and 'set-cookie' in responseheaders.keys(): # make dict of cookie request components = re.compile('\s*;\s*').split(responseheaders['set-cookie']) components = map(lambda s: s.split('='), components) DebugPrint(components) connection.putrequest(args[0], args[1], args[2]) connection.putheader('cookie', dict(components)[components[0][0]]) connection.putheader('authorization', authinfo(osmdata['server'])) connection.endheaders() return response.read() def authinfo(server): authfile = open(os.path.join(os.getenv('HOME'), '.netrc')) authdata = authfile.readlines() authfile.close() pattern = re.compile( '^machine\s+%s\s+login\s+(\S+)\s+password\s+(\S+)\s*$' % server) username, password = filter( lambda result: result, map(pattern.match, authdata))[0].groups() return 'Basic ' + base64.encodestring(username + ':' + password).strip() def negateids(*args): """negate all IDs in the records on the stack otherwise OSM might reject them""" if not len(osmdata['stack']): osminit(args[0]) args = args[1:] load() for record in osmdata['stack'][0]: DebugPrint('record', record, 'fields', map(lambda item: item[0], record[1])) if record[0] in ['node', 'segment', 'way']: for field in record[1]: if field[0] == 'id': field[1] = '%d' % -(abs(int(field[1]))) if len(record) > 2: for field in record[2:]: DebugPrint('record[2] field', field) if field[0] == 'seg': for properties in field[1]: if properties[0] == 'id': properties[1] = '%d' % -(abs(int(properties[1]))) def setreviewed(*args): """set all 'tiger:reviewed' tags to 'yes' otherwise OSM might reject them""" if not len(osmdata['stack']): osminit(args[0]) args = args[1:] load() for record in osmdata['stack'][0]: if record[0] in ['node', 'segment', 'way']: DebugPrint('record', record, 'fields', map(lambda item: item[0], record[1])) for field in record[2:]: DebugPrint('field', field) if field[0] == 'tag' and field[1][0] == ['k', 'tiger:reviewed']: field[1][1][1] = 'yes' def getbbox(*args): """fetch all nodes, segments, and ways within a particular bounding box from the OSM servers. args are: min_lon, min_lat, max_lon, max_lat """ coordinates = tuple(map(float, args)) return httprequest('GET', '/api/0.4/map?bbox=%f,%f,%f,%f' % coordinates) def tigerbbox(*args): """get the subset of a TIGER file within a particular bounding box use the same syntax as in bbox in the OSM API: minimum_longitude, minimum_latitude, maximum_longitude, maximum_latitude """ if not len(osmdata['stack']): osminit(args[0]) args = args[1:] load() nodes, segments, ways = [], [], [] min_lon, min_lat, max_lon, max_lat = map(float, args) DebugPrint('first getting the nodes within the boundaries') for record in osmdata['stack'][0]: addnode, id = True, 0 if record[0] == 'node': #DebugPrint('record', record, 'fields', map(lambda item: item[0], record[1])) for field in record[1]: #DebugPrint('field', field) if field[0] == 'lat': lat = float(field[1]) #DebugPrint('comparing latitude %f to %f and %f' % (lat, min_lat, max_lat)) if lat < min_lat or lat > max_lat: addnode = False elif field[0] == 'lon': lon = float(field[1]) if lon < min_lon or lon > max_lon: addnode = False elif field[0] == 'id': id = int(field[1]) if addnode and (id != 0): nodes.append(id) #DebugPrint('nodes', nodes) DebugPrint('now getting the segments containing ONLY those nodes') for record in osmdata['stack'][0]: id = 0 if record[0] == 'segment': #DebugPrint('record', record, 'fields', map(lambda item: item[0], record[1])) for field in record[1]: #DebugPrint('field', field) if field[0] == 'from': seg_from = float(field[1]) elif field[0] == 'to': seg_to = float(field[1]) elif field[0] == 'id': id = int(field[1]) if seg_from in nodes and seg_to in nodes and id != 0: segments.append(id) #DebugPrint(nodes, segments) DebugPrint('now removing unneeded segs from ways, and ways without segs') for record in osmdata['stack'][0]: if record[0] == 'way': DebugPrint('record', record) segs = filter(lambda field: field[0] == 'seg', record[1:]) DebugPrint('segs', segs) segs = filter(lambda seg: int(dict(seg[1:])['id']) in segments, segs) DebugPrint('segs after filter', segs) def addtimestamps(*args): """add timestamp fields to all 'node' and 'segment' records otherwise OSM might reject them""" if not len(osmdata['stack']): osminit(args[0]) args = args[1:] load() for record in osmdata['stack'][0]: if record[0] in ['node', 'segment']: DebugPrint('record', record, 'fields', map(lambda item: item[0], record[1])) if not 'timestamp' in map(lambda item: item[0], record[1]): record[1].append(['timestamp', timestamp()]) write() def getroad(*args): """extract a single road (from "name" tag) from an OSM file write it out to a new OSM file""" if not len(osmdata['stack']): osminit(args[0]) args = args[1:] load() newstack, nodes = [list(osmdata['stack'])[0][0:2]], [] for roadname in args: sys.stderr.write('extracting road: %s\n' % roadname) for way in filter(lambda item: item[0] == 'way', osmdata['stack'][0]): if ['tag', [['k', 'name'], ['v', roadname]]] in way: newstack[0].append(way) for seg in filter(lambda item: item[0] == 'seg', way): DebugPrint('segment', seg[1][0][1]) if seg[1][0][0] == 'id': id = seg[1][0][1] segment = filter(lambda item: item[0] == 'segment' and \ ['id', id] in item[1], osmdata['stack'][0]) DebugPrint('found segment', segment) newstack[0].insert(2, segment[0]) from_node = filter( lambda property: property[0] == 'from', segment[0][1])[0][1] to_node = filter( lambda property: property[0] == 'to', segment[0][1])[0][1] nodes += [from_node, to_node] DebugPrint('nodes', nodes) for nodenumber in dict(map(None, nodes, nodes)).keys(): node = filter(lambda item: item[0] == 'node' and \ ['id', nodenumber] in item[1], osmdata['stack'][0]) newstack[0].insert(2, node[0]) osmdata['stack'] = newstack negateids() setreviewed() write() def formatproperties(tag, *properties): DebugPrint('properties', properties) properties = properties[0] # remove outer tuple result = '' if len(properties) > 0: for property in properties: key, value = property delimiter = "'" if tag == 'tag' or "'" in value: delimiter = '"' result += ' %s=%s%s%s' % (key, delimiter, value, delimiter) return result def dumpstack(*args): """write out stack in order it was created ideally this should give a file identical to the original""" stack, index = args[0], 2 DebugPrint('stack: %d elements' % len(stack[0])) topleveltag, toplevelproperties = stack[0][0:2] dumptoplevel(topleveltag, toplevelproperties) while index < len(stack[0]): item = stack[0][index] tag, properties, other = item[0], item[1], item[2:] DebugPrint('dumping item:', tag, properties, other) dumpmultiple(tag, properties, other) index += 1 print formatend % topleveltag def dumptoplevel(tag, properties): formatstring = formatmultiple print formatstring % (tag, formatproperties(tag, properties)) def timestamp(): return strftime('%Y-%m-%dT%H:%M:%SZ', gmtime()) def dumpmultiple(tag, properties, other): formatstring = formatmultiple if len(other) == 0: print (formatstring + formatend) % (tag, formatproperties(tag, properties), tag) else: print formatstring % (tag, formatproperties(tag, properties)) for singletag, singleproperties in other: dumpsingle(singletag, singleproperties) print formatend % tag def dumpsingle(tag, properties): formatstring = formatsingle print formatstring % (tag, formatproperties(tag, properties)) def progress(): if not DebugPrint(): stackpicture = repr(map(lambda a: a[0], osmdata['stack'])) message = 'bytes read: %d; stack: %s' % (osmdata['bytes'], stackpicture) message = message + (' ' * (78 - len(message))) sys.stderr.write('%s\r' % message) def makedict(): """turn stack into a dict(ionary) of IDs to make property lookups easier""" DebugPrint(osmdata['stack'][0][0]) DebugPrint(osmdata['stack'][0][1]) osmdata['dict'][osmdata['stack'][0][0]] = dict(osmdata['stack'][0][1]) DebugPrint(osmdata['dict']) def load(): while True: line = osmdata['source'].readline() osmdata['bytes'] += len(line) element = element_pattern.search(line) if element: osmtype = element.groups()[0] if element.start() > 0 and not line[0:element.start()].isspace(): DebugPrint('skipped', line[0:element.start()]) if element.group().startswith('